package co.recloud.ariadne.store;

import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class MemoryCache {
	private class Entry {
		public Entry next;
		public Entry prev;
		public SortedMap<Long, Map<String, Object>> row;
		public String key;
	}

	private ConcurrentMap<String, Entry> data;
	private ConcurrentMap<String, Long> sizes;
	private Long size;
	private static final Long MEMCACHE_MAX = 102400000L;
	private Entry lru;
	private Entry mru;
	private Long capacity;

	private static MemoryCache singleton = null;

	public synchronized static MemoryCache getInstance() {
		if (singleton == null) {
			singleton = new MemoryCache(MEMCACHE_MAX);
		}
		return singleton;
	}

	public MemoryCache(Long capacity) {
		data = new ConcurrentHashMap<String, Entry>(100, .75F, 100);
		sizes = new ConcurrentHashMap<String, Long>(100, .75F, 100);
		size = 0L;
		this.capacity = capacity;
	}

	private Long getSize(Map<String, Object> row) {
		Long size = 0L;
		for (Object cell : row.values()) {
			if (cell instanceof String) {
				size += ((String) cell).length();
			} else if (cell instanceof Long) {
				size += 8;
			}
		}
		return size;
	}

	private synchronized void unlink(Entry entry) {
		Entry before = entry.prev;
		Entry after = entry.next;
		if (entry == lru) {
			lru = after;
		}
		if (before != null) {
			before.next = after;
		}
		if (after != null) {
			after.prev = before;
		}
		if (mru != null) {
			mru.next = entry;
		}
		entry.prev = mru;
		mru = entry;
	}

	private synchronized void setToMRU(Entry entry) {
		if (mru != null) {
			mru.next = entry;
		}
		entry.prev = mru;
		entry.next = null;
		mru = entry;
		if (lru == null) {
			lru = entry;
		}
	}

	public void put(String path, Long time, Map<String, Object> row) {
		Long dataSize = 0L;
		boolean fallThrough = false;
		if (row != null) {
			dataSize = getSize(row);
		}
        synchronized (this) {
            while (size + dataSize > capacity) {
                if (lru != null) {
                    String removeKey = lru.key;
                    if (lru.next != null) {
                        lru = lru.next;
                    }
                    lru.prev = null;
                    data.remove(removeKey);
                    size -= sizes.get(removeKey);
                    sizes.remove(removeKey);
                } else {
                    fallThrough = true;
                }
            }
        }
		if (!fallThrough) {
			checkPath(path, time, true);
			Entry entry = data.get(path);
			if(row == null) {
				return;
			}
			if (entry.row.get(time) != null) {
				Long oldSize = sizes.get(path);
				Long rowSize = getSize(entry.row.get(time));
				if (oldSize != null) {
					sizes.put(path, oldSize - rowSize);
				}
				size -= rowSize;
			} else if (entry.row.get(time) == null) {
				entry.row.put(time, new HashMap<String, Object>());
				SortedMap<Long, Map<String, Object>> headMap = entry.row.headMap(time);
				if (!headMap.isEmpty()) {
					Map<String, Object> prevRow = headMap.get(headMap.lastKey());
					if(prevRow != null) {
						entry.row.get(time).putAll(headMap.get(headMap.lastKey()));
					}
				}
				for (String column : row.keySet()) {
					Object value = row.get(column);
					if (value != null) {
						entry.row.get(time).put(column, row.get(column));
					}
				}
                synchronized (this) {
				    size += dataSize;
                }
			}
			Long oldSize = sizes.get(path);
            synchronized (this) {
                if (oldSize == null) {
                    oldSize = 0L;
                }
                sizes.put(path, oldSize + dataSize);
            }
		}
	}

	public Map<String, Object> get(String path, Long time) {
		Map<String, Object> result = null;
		if (checkPath(path, time, false)) {
			Entry entry = data.get(path);
			Long closestTime = entry.row.headMap(time + 1).lastKey();
			if (entry.row.get(closestTime) != null) {
				result = new HashMap<String, Object>();
				result.putAll(entry.row.get(closestTime));
			}
		} else {
			result = null;
		}
		return result;
	}

	private synchronized boolean checkPath(String path, boolean create) {
		Entry entry = data.get(path);
		boolean hasPath = entry != null;
		if (!hasPath && create) {
			entry = new Entry();
			entry.key = path;
			entry.row = new TreeMap<Long, Map<String,Object>>();
			data.put(path, entry);
			setToMRU(entry);
			return true;
		} else if (!hasPath) {
			return false;
		} else if (hasPath) {
			unlink(entry);
			setToMRU(entry);
		}
		return true;
	}

	private boolean checkPath(String path, Long time, boolean create) {
		if (checkPath(path, create)) {
			boolean hasTime = false;
			Entry entry = data.get(path);
			if (create) {
				if(!entry.row.containsKey(time)) {
					entry.row.put(time, null);
				}
			} else if (!entry.row.isEmpty()) {
				hasTime = !entry.row.headMap(time + 1).isEmpty();
			}
			return hasTime;
		}
		return false;
	}

	public void invalidate(String path, Long time) {
		checkPath(path, time, true);
	}
}
